99db2a65f76bf0cc251568c7cba970789389d150,src/main/java/com/ociweb/pronghorn/network/ServerSocketReaderStage.java,ServerSocketReaderStage,run,#,92
Before Change
//logger.info("found new data to read on "+groupIdx);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
selectorSize = selectedKeys.size();
After Change
//logger.info("found new data to read on "+groupIdx);
selectedKeys = selector.selectedKeys();
selectorSize = selectedKeys.size();
doneSelectors.clear();
for (SelectionKey selection: selectedKeys) {
assert(0 != (SelectionKey.OP_READ & selection.readyOps())) : "only expected read";
SocketChannel socketChannel = (SocketChannel)selection.channel();
//logger.info("is blocking {} open {} ", selection.channel().isBlocking(),socketChannel.isOpen());
//get the context object so we know what the channel identifier is
ConnectionContext connectionContext = (ConnectionContext)selection.attachment();
long channelId = connectionContext.getChannelId();
if (isTLS) {
SSLConnection cc = coordinator.get(channelId, groupIdx);
if (null!=cc && null!=cc.getEngine()) {
HandshakeStatus handshakeStatus = cc.getEngine().getHandshakeStatus();
if (HandshakeStatus.NEED_TASK == handshakeStatus) {
Runnable task;//TODO: there is anopporuntity to have this done by a different stage in the future.
while ((task = cc.getEngine().getDelegatedTask()) != null) {
task.run();
}
handshakeStatus = cc.getEngine().getHandshakeStatus();
} else if (HandshakeStatus.NEED_WRAP == handshakeStatus) {
releasePipesForUse();
assert(-1 == coordinator.checkForResponsePipeLineIdx(cc.getId())) : "should have already been relased";
continue;//one of the other pipes can do work
}
}
}
// releasePipesForUse();
int responsePipeLineIdx = coordinator.checkForResponsePipeLineIdx(channelId);
final boolean newBeginning = (responsePipeLineIdx<0);
if (newBeginning) {
//this release is required in case we are swapping pipe lines, we ensure that the latest sequence no is stored.
releasePipesForUse();
responsePipeLineIdx = coordinator.responsePipeLineIdx(channelId);
if (-1 == responsePipeLineIdx) { //handshake is dropped by input buffer at these loads?
releasePipesForUse();
responsePipeLineIdx = coordinator.responsePipeLineIdx(channelId);
if (responsePipeLineIdx<0) {
// itemsWithNoPipeCount++;
// if (--maxWarningCount>0) {//this should not be a common error but needs to be here to promote good configurations
// logger.warn("bump up maxPartialResponsesServer count, performance is slowed due to waiting for available input pipe on client");
// }
continue;//try other connections which may already have pipes, this one can not reserve a pipe now.
}
}
}